agentmux_srv\backend\wshutil/
cmdreader.rs

1#![allow(dead_code)]
2// Copyright 2025-2026, AgentMux Corp.
3// SPDX-License-Identifier: Apache-2.0
4
5//! CLI stdin reader for WSH command-line interface.
6//! Port of Go's `pkg/wshutil/wshcmdreader.go`.
7//!
8//! Reads JSON-RPC messages from stdin for the `wsh` CLI tool.
9//! Supports both single-shot commands and interactive mode.
10
11
12use std::io::{BufRead, BufReader, Read};
13use tokio::sync::mpsc;
14
15/// CLI stdin reader that parses JSON-RPC messages from standard input.
16pub struct CmdReader {
17    /// Channel to send parsed messages.
18    pub msg_tx: mpsc::Sender<Vec<u8>>,
19}
20
21impl CmdReader {
22    /// Create a new CmdReader with the given message sender.
23    pub fn new(msg_tx: mpsc::Sender<Vec<u8>>) -> Self {
24        Self { msg_tx }
25    }
26
27    /// Read a single JSON message from stdin (blocking).
28    pub fn read_single_message(input: impl Read) -> Result<Vec<u8>, String> {
29        let mut reader = BufReader::new(input);
30        let mut line = String::new();
31        reader
32            .read_line(&mut line)
33            .map_err(|e| format!("read error: {}", e))?;
34
35        let trimmed = line.trim();
36        if trimmed.is_empty() {
37            return Err("empty input".to_string());
38        }
39
40        // Validate JSON
41        serde_json::from_str::<serde_json::Value>(trimmed)
42            .map_err(|e| format!("invalid JSON: {}", e))?;
43
44        Ok(trimmed.as_bytes().to_vec())
45    }
46
47    /// Start reading JSON lines from stdin in a background thread.
48    /// Each line is sent to the message channel.
49    /// Returns a handle to the reader thread.
50    pub fn start_reading(
51        &self,
52        input: impl Read + Send + 'static,
53    ) -> std::thread::JoinHandle<Result<(), String>> {
54        let tx = self.msg_tx.clone();
55        std::thread::spawn(move || {
56            let reader = BufReader::new(input);
57            for line in reader.lines() {
58                let line = line.map_err(|e| format!("read error: {}", e))?;
59                let trimmed = line.trim().to_string();
60                if trimmed.is_empty() {
61                    continue;
62                }
63
64                // Validate JSON before sending
65                if serde_json::from_str::<serde_json::Value>(&trimmed).is_err() {
66                    tracing::warn!("skipping invalid JSON line: {}", &trimmed[..trimmed.len().min(100)]);
67                    continue;
68                }
69
70                tx.blocking_send(trimmed.into_bytes())
71                    .map_err(|e| format!("channel send error: {}", e))?;
72            }
73            Ok(())
74        })
75    }
76
77    /// Read all available input from stdin and return as a single message.
78    /// Used for piped input (non-interactive).
79    pub fn read_all(input: impl Read) -> Result<Vec<u8>, String> {
80        let mut buf = String::new();
81        let mut reader = BufReader::new(input);
82        reader
83            .read_to_string(&mut buf)
84            .map_err(|e| format!("read error: {}", e))?;
85
86        let trimmed = buf.trim();
87        if trimmed.is_empty() {
88            return Err("empty input".to_string());
89        }
90
91        Ok(trimmed.as_bytes().to_vec())
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use std::io::Cursor;
99
100    #[test]
101    fn test_read_single_message() {
102        let input = Cursor::new(b"{\"command\":\"test\"}\n");
103        let msg = CmdReader::read_single_message(input).unwrap();
104        assert_eq!(String::from_utf8(msg).unwrap(), "{\"command\":\"test\"}");
105    }
106
107    #[test]
108    fn test_read_single_message_invalid_json() {
109        let input = Cursor::new(b"not json\n");
110        let result = CmdReader::read_single_message(input);
111        assert!(result.is_err());
112        assert!(result.unwrap_err().contains("invalid JSON"));
113    }
114
115    #[test]
116    fn test_read_single_message_empty() {
117        let input = Cursor::new(b"\n");
118        let result = CmdReader::read_single_message(input);
119        assert!(result.is_err());
120    }
121
122    #[test]
123    fn test_read_all() {
124        let input = Cursor::new(b"{\"data\": \"hello world\"}");
125        let msg = CmdReader::read_all(input).unwrap();
126        assert_eq!(
127            String::from_utf8(msg).unwrap(),
128            "{\"data\": \"hello world\"}"
129        );
130    }
131
132    #[tokio::test]
133    async fn test_start_reading() {
134        let (tx, mut rx) = mpsc::channel(10);
135        let reader = CmdReader::new(tx);
136
137        let input = Cursor::new(b"{\"cmd\":\"a\"}\n{\"cmd\":\"b\"}\nnot-json\n{\"cmd\":\"c\"}\n");
138        let handle = reader.start_reading(input);
139
140        let msg1 = rx.recv().await.unwrap();
141        assert_eq!(String::from_utf8(msg1).unwrap(), "{\"cmd\":\"a\"}");
142
143        let msg2 = rx.recv().await.unwrap();
144        assert_eq!(String::from_utf8(msg2).unwrap(), "{\"cmd\":\"b\"}");
145
146        // "not-json" is skipped
147        let msg3 = rx.recv().await.unwrap();
148        assert_eq!(String::from_utf8(msg3).unwrap(), "{\"cmd\":\"c\"}");
149
150        handle.join().unwrap().unwrap();
151    }
152}